/*
 *  Filename:lsv_gc.h
 *  Description:
 *
 *  Created on: 2017年3月17日
 *  Author: Asdf(825674301)
 */

#ifndef __LSV_GC_H_
#define __LSV_GC_H_

/**
 * @file GC Module.
 *
 * 分代GC
 * - CQ + Heap构成新生代
 * - Old storage构成老年代
 *
 * CQ: check queue:
 * - gc task
 *
 * Heap:
 * - 大小：1023
 *
 * Old storage：
 * - 按FIFO方式组织
 * - ready chunk是尾部，缓存新的记录
 *
 * Tasks:
 * - gc task（可能中断，SIMD并发模式）
 * - valueup task(不中断）
 *
 * 有新log生成的时候，先进入CQ，如前端log，gc merge，fullgc等
 * 按一定规则，从CQ里取log，进行gc check，并放入heap中。
 *
 * heap中log，进行gc free or gc merge
 * 如果heap满，沉一半到old storage
 *
 * old storage按FIFO组织，按策略进行fullgc。
 *
 * @todo 与dedup的交互
 * @todo gc check性能优化
 * @todo 新gc算法
 *
 * 几个问题：
 * - 性能
 * - Qos
 * - Recovery
 *
 */

#include "coroutine.h"
#include "timer.h"

#include "lsv_conf.h"
#include "lsv_types.h"
#include "lsv_volume_proto.h"
#include "lsv_log_page_bitmap.h"

#define IS_STORAGE_BITMAP
#define LSV_GC_STATISTICS

#define LSV_GC_STORAGE_BIT_LEN    (1)
#define LSV_GC_STORAGE_CHUNK_SIZE (LSV_CHUNK_SIZE*8/LSV_GC_STORAGE_BIT_LEN)
#define LSV_GC_BITMAP_CHUNK_NUM   (LSV_MAX_CHUNK_NUM*LSV_GC_STORAGE_BIT_LEN/8/LSV_CHUNK_SIZE)

/*===define old storage===*/

#define LSV_GC_OS_CHUNK_NUM ((LSV_CHUNK_SIZE-sizeof(uint32_t)*2)/sizeof(uint32_t))

/**
 * chunk0, 存储本模块引导信息, 分两部分：
 * - 用于log bitmap的LSV_LOGGC_STORAGE_CHUNK_NUM(32)个chunk_id
 * - old storage的引导信息
 */
#define LSV_GC_OS_HEAD      (0)
#define LSV_GC_BITMAP_HEAD  (LSV_PAGE_SIZE/2)

#pragma pack(8)

/**
 * @brief 持久化到log_page_id的开始, 后半页的开始处是LOG bitmap的chunk_id数组
 */
typedef struct {
        uint32_t chunk_count;
        uint32_t gc_chkid;
        uint32_t ready_chkid;
        uint32_t ready_chunk_count;
} lsv_gc_old_storage_meta_t;

typedef struct {
        uint32_t chunk_ids[LSV_GC_BITMAP_CHUNK_NUM];
} lsv_gc_old_storage_bitmap_meta_t;

/**
 * 持久化，此结构总大小是1M。cq + heap + os构成了全部的log。
 * gc针对全部log，选出回收价值最大者。
 *
 * 当heap满时，就会存入old storage，同时检查是否需要fullgc;
 * fullgc，会重新拉起，进入cq，继续gc的全过程。
 *
 * 构成单链表
 */
typedef struct __lsv_loggc_old_storage_record {
        uint32_t count;
        uint32_t next_chkid;
        uint32_t storage_chkid[LSV_GC_OS_CHUNK_NUM];
} lsv_gc_old_storage_record_t;

#pragma pack()


/**
 * gc过程的基本单元，表示一个参与gc的chunk.
 * 出现在CQ，heap，hash index里。
 *
 * 一个chunk只能出现在CQ，heap和old storage的其中一处，不能出现在多于一处的地方
 *
 */
typedef struct _lsv_loggc_logctrl {
        lsv_list_head_t hook; //must be first

        uint32_t chunk_id;
        lsv_log_head_t head;

        // gc check时，需要log head page信息，如果缓存到内存，以空间换时间，可以提升性能

#if LSV_GC_LOGCTRL_HLOG_TYPE==1
        lsv_log_hlog_t hlog[LSV_LOG_PAGE_SIZE]; //size 4160
#else
        lsv_log_hlog_t* hlog; //size 88
#endif

        // for debug
        int for_gc;

        // 参与gc merge的次数, chunk内页被copy的次数的最小值，与是否返回CQ有关
        uint8_t gc_count;

        // 更新bitmap时，采集这个数据
        uint8_t gc_value;

        // 计算每页的引用情况，体现真实的回收价值
        // 无法通过valueup去确定，因为还有快照的引用关系
        lsv_log_page_bitmap_t lp_bm;
} lsv_gc_logctrl_t;

typedef struct _lsv_loggc_check_queue {
        struct list_head queue;
        uint32_t count;
} lsv_gc_check_queue_t;

typedef lsv_gc_check_queue_t lsv_gc_valueup_queue_t;

typedef struct _lsv_gc_wait_queue {
        struct list_head queue;
        uint32_t count;           // queue length
        uint32_t value;           // alive page number
} lsv_gc_wait_queue_t;

typedef struct _lsv_gc_heap {
//	lsv_lock_t lock;
        uint32_t count;           // 元素数量
        lsv_gc_logctrl_t *heap[LSV_GC_HEAP_SIZE];
} lsv_gc_heap_t;

typedef struct __lsv_gc_old_storage {
        lsv_lock_t lock;  //protect <chunk_count,gc_chkid,ready_chkid,ready_chunk>

        // for fullgc, NOT use now
        uint32_t all_gc_value;

        // {{
        uint32_t chunk_count; //count of chunk in os, not include ready chunk
        uint32_t gc_chkid;    //next to full gc chunk
        uint32_t ready_chkid; //next to storage chunk
        // }}

        // 按单链表方式组织，gc_chkid是头，ready_chkid是尾
        lsv_gc_old_storage_record_t ready_chunk; //next to storage chunk data
} lsv_gc_old_storage_t;

/*===end define old storage===*/

typedef struct __lsv_gc_storage_record {
        uint32_t chunk_id;
        lsv_u8_t bitmap[LSV_CHUNK_SIZE];
} lsv_gc_bitmap_record_t;

#ifdef LSV_GC_STATISTICS

typedef struct __lsv_gc_statistics {
        uint64_t real_pages;
        uint64_t storage_pages;

        uint64_t all_free_count;   //count the chunk that free immediate
        uint64_t merge_free_count; //count the chunk that free by merge
        uint64_t merge_count;      //count the chunk that be merged

        // 同时运行的gc task
        uint64_t task_count;
        uint64_t task_check_count;
        uint64_t task_heap_insert_count;
        uint64_t task_heap_merge_count;

        uint64_t task_heap_merge_merge_count;
        uint64_t task_heap_merge_write_count;
        uint64_t task_heap_merge_update_count;
        uint64_t task_heap_merge_free_count;

} lsv_gc_statistics_t;

#define DINFO_LOGGC_STATISTICS(stat_info) do { \
DINFO("loggc_stat_info:page<%llu,%llu,%.2lf>,all_free:%llu,merge_free:%llu,free:%llu,"\
        "merge:%llu,task:%llu,task_check:%llu,task_heap_insert:%llu,"\
        "task_heap_merge:%llu,task_heap_merge_merge:%llu,task_heap_merge_write:%llu,"\
        "task_heap_merge_update:%llu,task_heap_merge_free:%llu\n", \
        (LLU)(stat_info)->real_pages, \
        (LLU)(stat_info)->storage_pages, \
        (stat_info)->real_pages / (double)(stat_info)->storage_pages,\
        (LLU)(stat_info)->all_free_count, \
        (LLU)(stat_info)->merge_free_count, \
        (LLU)(stat_info)->all_free_count+(stat_info)->merge_free_count,\
        (LLU)(stat_info)->merge_count, \
        (LLU)(stat_info)->task_count, \
        (LLU)(stat_info)->task_check_count, \
        (LLU)(stat_info)->task_heap_insert_count, \
        (LLU)(stat_info)->task_heap_merge_count, \
        (LLU)(stat_info)->task_heap_merge_merge_count, \
        (LLU)(stat_info)->task_heap_merge_write_count, \
        (LLU)(stat_info)->task_heap_merge_update_count, \
        (LLU)(stat_info)->task_heap_merge_free_count); \
} while(0)

#endif

typedef struct __lsv_loggc_context {
        // gc main
        uint32_t check_queue_size;
        uint32_t total_chunk_num;

        lsv_gc_check_queue_t check_queue;

        lsv_gc_valueup_queue_t valueup_queue;

        lsv_gc_heap_t gc_heap;
        lsv_gc_old_storage_t old_storage;

        // gc log_ctrl
        // lsv_rwlock_t log_ctrl_tab_lock;
        // map: chunk_id => logctrl
        hashtable_t logctrl_tab;

        // task queue
        co_mq_t gc_queue;
        co_mq_t fullgc_queue;

        // 发起的gc task计数器
        uint64_t gc_count;
        easy_timer_t fullgc_timer;

#ifdef IS_STORAGE_BITMAP
        // storage for recovery
        lsv_u8_t bitmap_is_open;

        lsv_lock_t bitmap_lock[LSV_GC_BITMAP_CHUNK_NUM];

        // 32M+, 记录的只是已使用而未持久化的old stor的chunk
        // 要free掉，或持久化到old stor的chunk，unset bitmap
        // 异常关闭时，下次加载卷时，需要进行恢复，replay bitmap，并加入cq。
        // @todo set or unset bitmap，需要写入一页数据，能否批量处理？
        lsv_gc_bitmap_record_t *bitmaps[LSV_GC_BITMAP_CHUNK_NUM];
#endif

#ifdef LSV_GC_STATISTICS
        lsv_gc_statistics_t stat_info;
#endif
} lsv_gc_context_t;

#define LSV_GC_ARG_TPYE_GC      (1)
#define LSV_GC_ARG_TPYE_VALUEUP (2)

/**
 * gc, valueup task描述符
 */
typedef struct __lsv_gc_arg {
        lsv_list_head_t hook; // must be first

        lsv_volume_proto_t *lsv_info;
        lsv_u8_t msg_type;
        lsv_u8_t is_free;

        // for valueup
        uint32_t chunk_id;
        uint32_t chunk_page_off;

        // for gc
        lsv_u8_t type;
} lsv_gc_arg_t;

typedef struct {
        // struct list_head hook;
        // task_t task;
        lsv_volume_proto_t *lsv_info;
        // task 1
        lsv_gc_arg_t *gc_arg;
// task 2
//        lsv_u8_t type;
//        lsv_loggc_logctrl_t *logctrl_cqueue;
//        lsv_loggc_logctrl_t *logctrl_gc;
} lsv_gc_taskctx_t;

typedef struct __lsv_gc_add2check_arg {
        lsv_volume_proto_t *lsv_info;
        uint32_t chunk_id;
        lsv_u8_t add_type;
        task_t *wait_task;
        uint32_t *count;
        lsv_u8_t is_free;
} lsv_gc_add2check_arg_t;

typedef struct __lsv_gc_recovery_storage_record_arg {
        lsv_volume_proto_t *lsv_info;
        uint32_t idx;
        uint32_t bitmap_chkid;
        task_t *wait_task;
        uint32_t *count;
        lsv_s32_t *rc;
        lsv_u8_t is_free;
} lsv_gc_recovery_bitmap_record_arg_t;


/**
 * @brief 是否从CQ里拉记录.
 */
#define LSV_GC_TYPE_BIT_POLLFULL (0x01) //only poll a log when check_queue full
#define LSV_GC_TYPE_BIT_POLL     (0x02) //poll a log. 00, 01>not poll, 10>poll a log, 11> poll a log when check_queue full

/**
 * @brief 是否连续地进行gc，注意CQ进出的平衡.
 */
#define LSV_GC_TYPE_BIT_LOOP     (0x04) //gc until gc_heap don't have a log to gc

/**
 * @brief 不考虑gc merge策略，强制进行gc merge.
 */
#define LSV_GC_TYPE_BIT_FORCE    (0x08) //gc anyway,without limit in LSV_LOGGC_GC_RATIO, gc_heap may null

#define LSV_GC_TYPE_LOOP   (LSV_GC_TYPE_BIT_POLLFULL|LSV_GC_TYPE_BIT_POLL|LSV_GC_TYPE_BIT_LOOP) //0x07
#define LSV_GC_TYPE_NORMAL (LSV_LOGGC_GC_TYPE_BIT_POLLFULL|LSV_LOGGC_GC_TYPE_BIT_POLL) //0x03

#define LSV_CHECK_QUEUE_ADD_TYPE_HEAD (0)
#define LSV_CHECK_QUEUE_ADD_TYPE_TAIL (1)

// init

int lsv_gc_create(lsv_volume_proto_t *lsv_info);

int lsv_gc_delete(lsv_volume_proto_t *lsv_info);

int lsv_gc_flush(lsv_volume_proto_t *lsv_info);

int lsv_gc_load(lsv_volume_proto_t *lsv_info);

/**
 * 如果异常终止，cq和heap中chunk来不及写入os，故需恢复，从bitmap重建全部chunk。
 *
 * @param lsv_info
 * @return
 */
int lsv_gc_recovery(lsv_volume_proto_t *lsv_info);

/**
 * @brief add to log gc CQ，连接前后台任务的一个入口
 *
 * @param lsv_info
 * @param log_proto
 * @return
 */
int lsv_gc_add(lsv_volume_proto_t *lsv_info, lsv_log_proto_t *log_proto);

/**
 * @brief add GC task to CQ.
 *
 * gc, valueup等任务分两步完成：
 * - 加入队列
 * - 从队列里取任务执行
 *
 * @param lsv_info
 * @param log_ptoto
 * @param is_offer
 * @return
 */
int lsv_gc_add_to_check_queue(lsv_volume_proto_t *lsv_info, lsv_log_proto_t* log_ptoto, lsv_u8_t is_offer);

/**
 * @brief 从CQ里取出一项，执行GC; 或连续执行GC.
 *
 * gc task并发执行，但务必限制并发运行的任务数量，底层scheduler有bucket限制
 *
 * @param lsv_info
 * @param type LSV_LOGGC_GC_TYPE_LOOP | NORMAL
 * @return
 */
int lsv_gc(lsv_volume_proto_t *lsv_info, lsv_u8_t type);

/**
 * @brief bitmap在覆盖更新时调用
 *
 * @param lsv_info
 * @param chunk_id
 * @param chunk_page_off NOT use now
 * @return 0
 * @return ENOKEY
 */
int lsv_gc_valueup(lsv_volume_proto_t *lsv_info, uint32_t chunk_id, uint32_t chunk_page_off);

int lsv_gc_valueup_commit(lsv_volume_proto_t *lsv_info);

/**
 * @brief fullgc
 *
 * fullgc通过timer驱动，或者insert heap时检查触发，采用co_mq运行模式
 *
 * @param lsv_info
 * @return
 */
int lsv_gc_fullgc_start_timer(lsv_volume_proto_t *lsv_info);

int lsv_gc_fullgc_check(lsv_volume_proto_t *lsv_info);

int lsv_gc_setattr(lsv_volume_proto_t *lsv_info);

// from lsv_log_stat.c
void lsv_gc_stat_init(lsv_gc_context_t* loggc_context);
void lsv_gc_stat(lsv_volume_proto_t *lsv_info);

/**
 * @brief chunk0.log_page_id，记录gc bitmap的chunk ids
 *
 * @param lsv_info
 * @return
 */
int lsv_gc_bitmap_set_head(lsv_volume_proto_t *lsv_info);

int lsv_gc_bitmap_use(lsv_volume_proto_t *lsv_info, lsv_u32_t chunk_id, int is_use);

#endif /* __LSV_GC_H_ */
